| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- from opentelemetry import trace
- from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
- OTLPSpanExporter,
- )
- from opentelemetry.sdk.resources import Resource
- from opentelemetry.sdk.trace import TracerProvider
- from opentelemetry.sdk.trace.export import (
- BatchSpanProcessor,
- ConsoleSpanExporter,
- )
- from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
- import json
- import time
- import random
- import os
- api_host = os.environ.get('API_HOST', 'guu84124.live.dynatrace.com')
- api_token = os.environ.get('API_TOKEN', 'asdfasdfasdfasdfaasdf')
- span_exporter = OTLPSpanExporter(
- endpoint="https://".string(api_host)."/api/v2/otlp/v1/traces", #TODO replace <URL> with the URL as determined in section 2 above
- headers={
- "Authorization": "Api-Token ".string(api_token) #TODO replace <TOKEN> with the authentication token created in section 2 above
- },
- )
- resource = Resource.create({
- #customizable resource attributes
- "service.name": "otel_python_sqs_consumer",
- "service.version": "1.0.0"
- })
- trace.set_tracer_provider(TracerProvider(resource=resource))
- tracer = trace.get_tracer_provider().get_tracer(__name__)
- trace.get_tracer_provider().add_span_processor(
- BatchSpanProcessor(span_exporter)
- )
- import boto3
- session = boto3.session.Session()
- sqs = session.client('sqs', region_name="us-east-1", aws_access_key_id="", aws_secret_access_key="")
- def main():
- queue_url = 'https://sqs.us-east-1.amazonaws.com/<ACCOUNT>/otel_queue'
- response = sqs.receive_message(
- QueueUrl=queue_url,
- AttributeNames=[
- 'SentTimestamp'
- ],
- MessageAttributeNames=[
- 'All'
- ],
- VisibilityTimeout=0,
- WaitTimeSeconds=0
- )
- try:
- for message in response['Messages']:
- body = json.loads(message['Body'])
- print(body['detail'])
- ctx = TraceContextTextMapPropagator().extract(carrier=body['detail'])
- with tracer.start_as_current_span("Read From SQS Queue", kind=trace.SpanKind(4), context=ctx) as span:
- span.set_attribute("aws.service", "sqs/otel_queue")
- print(message)
- receipt_handle = message['ReceiptHandle']
- # Delete received message from queue
- sqs.delete_message(
- QueueUrl=queue_url,
- ReceiptHandle=receipt_handle
- )
- span.set_status(status=trace.StatusCode(1))
- except Exception as e:
- pass
- if __name__ == '__main__':
- while True:
- main()
|